package com.atguigu.champter7.time;

import com.atguigu.beans.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class Flink02_Time_Event {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",9999);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);


        SingleOutputStreamOperator<String> mainStream = env
                .socketTextStream("hadoop104", 9999)
                .map(line -> {
                    String[] watersensors = line.split(",");
                    return new WaterSensor(watersensors[0], Long.valueOf(watersensors[1]), Integer.valueOf(watersensors[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner((e,ts) ->e.getTs())
                )
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    @Override
                    public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                        System.out.println("O泡时间 :"+timestamp);
                        out.collect("o泡时间到!!!!!");
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        if (value.getVc() == 20){
                            ctx.timerService().registerEventTimeTimer(value.getTs() + 3000);
                        }

                    }
                });
        mainStream.print();
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
